fix: EnforceDistribution optimizer preserves fetch (LIMIT) from distribution-changing operators#21170
fix: EnforceDistribution optimizer preserves fetch (LIMIT) from distribution-changing operators#21170zhuqi-lucas wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes a correctness bug in the physical optimizer where EnforceDistribution could drop a fetch (LIMIT) that had been pushed down into distribution-changing operators (CoalescePartitionsExec / SortPreservingMergeExec), leading to extra/duplicate rows for multi-partition inputs.
Changes:
- Track and propagate
fetchwhile stripping distribution-changing operators so it can be re-applied later. - Extend
add_merge_on_topto optionally applyfetchwhen re-addingSortPreservingMergeExec. - Add regression tests ensuring
fetchsurvivesEnforceDistributionacross coalesce and SPM cases.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
datafusion/physical-optimizer/src/enforce_distribution.rs |
Captures fetch while stripping dist-changing ops; attempts to reapply it during merge insertion or via a fallback wrapper. |
datafusion/core/tests/physical_optimizer/enforce_distribution.rs |
Adds regression tests to ensure fetch is preserved through EnforceDistribution. |
Comments suppressed due to low confidence (1)
datafusion/physical-optimizer/src/enforce_distribution.rs:992
add_merge_on_toponly applies the strippedfetchto a newly createdSortPreservingMergeExec, but not to theCoalescePartitionsExecbranch. If the child has no ordering, this leavesfetchunconsumed and triggers the fallback wrapper later, potentially producing redundantCoalescePartitionsExecnodes and extra overhead. Consider applyingfetch.take()toCoalescePartitionsExec::new(...).with_fetch(...)as well when adding the merge/coalesce on top.
let new_plan = if let Some(req) = input.plan.output_ordering() {
Arc::new(
SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan))
.with_fetch(fetch.take()),
) as _
} else {
// If there is no input order, we can simply coalesce partitions:
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
};
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
836549e to
c8cc0b3
Compare
rkrishn7
left a comment
There was a problem hiding this comment.
Thanks @zhuqi-lucas! Left a couple comments
| if let Some(fetch_val) = fetch.take() { | ||
| let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm { | ||
| // Re-insert the original SortPreservingMergeExec with fetch. | ||
| spm.with_fetch(Some(fetch_val)).unwrap() |
There was a problem hiding this comment.
Hmm, is spm actually stale here? It was captured prior to processing of the plan's children above. I think we might want to use the SortPreservingMergeExec constructor directly here with the new plan.
There was a problem hiding this comment.
Good catch! Yes, spm was indeed stale — it was captured in remove_dist_changing_operators before the child subtree went through ensure_distribution, so it referenced the old (pre-rewrite) child plan.
Fixed in 8d2ee25: instead of capturing the entire Arc<dyn ExecutionPlan>, we now only capture the LexOrdering from the SPM. In the fallback path, we reconstruct a fresh SortPreservingMergeExec::new(ordering, dist_context.plan) using the current (rewritten) child. This also eliminates the unwrap().
Additionally, the spm capture was gated on fetch.is_none(), which meant it would be skipped when an outer operator (e.g. CoalescePartitionsExec) already set fetch. I decoupled it to use spm_ordering.is_none() instead. Added a regression test (nested_coalesce_over_spm_preserves_spm_ordering) that confirms the old code produced CoalescePartitionsExec (losing sort semantics) while the fix correctly produces SortPreservingMergeExec.
| } else { | ||
| // The fetch came from a CoalescePartitionsExec. Re-introduce | ||
| // it as a CoalescePartitionsExec(fetch=N) wrapping the output. | ||
| Arc::new( |
There was a problem hiding this comment.
Do we want to check if the new plan outputs more than a single partition? And if so, use GlobalLimitExec instead of CoalescePartitionsExec?
Not sure this one matters too much though.
There was a problem hiding this comment.
Good point. In the current flow, when fetch is unconsumed it means add_merge_on_top didn't fire (either UnspecifiedDistribution or single partition). For single partition the CoalescePartitionsExec is essentially a no-op wrapper with fetch. For multi-partition cases, the fetch was already consumed by add_merge_on_top. So in practice I think the current behavior is correct, but I'll keep an eye on edge cases. Thanks for raising it!
8d2ee25 to
5b0a42e
Compare
…ibution-changing operators When `LimitPushdown` merges a `GlobalLimitExec` into a `CoalescePartitionsExec` or `SortPreservingMergeExec` as a `fetch` value, `EnforceDistribution` would strip and re-insert these operators without preserving the fetch. This silently drops the LIMIT for queries over multi-partition sources. The fix captures the `fetch` in `remove_dist_changing_operators` and threads it through `add_merge_on_top`, or re-introduces it as a fallback when the merge operator is not re-inserted. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…re from fetch - Capture only the LexOrdering from the stripped SortPreservingMergeExec instead of the entire plan node, then reconstruct a fresh SPM with the current (possibly rewritten) child plan in the fallback path. This avoids reusing a stale SPM that references an outdated subtree. - Decouple spm_ordering capture from fetch — use `spm_ordering.is_none()` instead of gating on `fetch.is_none()`, so the SPM ordering is recorded even when an outer operator already set fetch. - Remove unwrap() on with_fetch by constructing SPM directly. - Fix test comment: fallback is CoalescePartitionsExec/SPM, not GlobalLimitExec. - Add regression test: nested CoalescePartitionsExec(fetch=5) over SortPreservingMergeExec(fetch=3) now correctly preserves SPM ordering.
5b0a42e to
d861c98
Compare
|
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
Which issue does this PR close?
Rationale for this change
When
LimitPushdownmerges aGlobalLimitExecinto aCoalescePartitionsExecorSortPreservingMergeExecas afetchvalue, theEnforceDistributionoptimizer rule strips and re-inserts these distribution-changing operators without preserving thefetch. This silently drops the LIMIT for queries over multi-partition sources, potentially returning duplicate/extra rows.What changes are included in this PR?
remove_dist_changing_operatorsnow captures anyfetchvalue and theLexOrderingfrom a strippedSortPreservingMergeExec(if present) before stripping operators. Only the ordering is captured — not the full plan node — so that a fresh SPM can be reconstructed later with the current (possibly rewritten) child plan, avoiding stale references.add_merge_on_topaccepts an optionalfetchand applies it to the newly createdSortPreservingMergeExecorCoalescePartitionsExec.fetchwas not consumed byadd_merge_on_top(e.g., when the parent hadUnspecifiedDistributionor the child already had a single partition), the limit is re-introduced as a wrapping operator so it is never silently lost. When the original operator was an SPM, a freshSortPreservingMergeExecis reconstructed from the captured ordering to preserve sort semantics.spm_ordering.is_none()instead of gating onfetch.is_none(), so the SPM ordering is recorded even when an outer operator (e.g.CoalescePartitionsExec) already setfetch.Are these changes tested?
Yes, four tests are added:
coalesce_partitions_fetch_preserved_by_enforce_distribution— unsorted multi-partition source withCoalescePartitionsExec(fetch=1)coalesce_partitions_fetch_preserved_sorted— sorted multi-partition source withCoalescePartitionsExec(fetch=5)spm_fetch_preserved_by_enforce_distribution— sorted multi-partition source withSortPreservingMergeExec(fetch=3)nested_coalesce_over_spm_preserves_spm_ordering— nestedCoalescePartitionsExec(fetch=5)overSortPreservingMergeExec(fetch=3), verifying that SPM ordering is preserved even when an outer operator already setfetchAre there any user-facing changes?
No API changes. Queries with LIMIT over multi-partition sources will now correctly preserve the limit through the
EnforceDistributionoptimizer pass.